agentmux_srv\backend\blockcontroller/
health.rs

1// Copyright 2025, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Agent health/liveness monitoring.
5//!
6//! Watches subprocess output activity, classifies errors, and emits
7//! `agenthealth` WPS events when health state transitions occur.
8//!
9//! Design: docs/specs/agent-health-design.md
10
11use std::collections::VecDeque;
12use std::sync::{Arc, Mutex};
13use std::time::{Duration, Instant};
14
15use serde::Serialize;
16
17use crate::backend::wps;
18
19// ---- Health states ----
20
21/// Agent health status (orthogonal to shellprocstatus).
22#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
23#[serde(rename_all = "lowercase")]
24pub enum AgentHealth {
25    Healthy,
26    Idle,
27    Degraded,
28    Stalled,
29    Dead,
30    Exited,
31}
32
33impl AgentHealth {
34    pub fn as_str(&self) -> &'static str {
35        match self {
36            Self::Healthy => "healthy",
37            Self::Idle => "idle",
38            Self::Degraded => "degraded",
39            Self::Stalled => "stalled",
40            Self::Dead => "dead",
41            Self::Exited => "exited",
42        }
43    }
44}
45
46/// Error severity classification.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum ErrorClass {
49    Transient,
50    Fatal,
51}
52
53// ---- Event payload ----
54
55/// WPS event payload for health transitions.
56#[derive(Debug, Clone, Serialize)]
57pub struct AgentHealthEvent {
58    pub blockid: String,
59    pub health: String,
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub exit_code: Option<i32>,
62    pub detail: String,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub last_error: Option<String>,
65}
66
67// ---- Error tracker ----
68
69/// Sliding-window error tracker.
70struct ErrorTracker {
71    window: VecDeque<(Instant, ErrorClass)>,
72    window_duration: Duration,
73    consecutive_transient: u32,
74}
75
76impl ErrorTracker {
77    fn new(window_duration: Duration) -> Self {
78        Self {
79            window: VecDeque::new(),
80            window_duration,
81            consecutive_transient: 0,
82        }
83    }
84
85    fn prune(&mut self) {
86        let cutoff = Instant::now() - self.window_duration;
87        while self.window.front().is_some_and(|(t, _)| *t < cutoff) {
88            self.window.pop_front();
89        }
90    }
91
92    fn record(&mut self, class: ErrorClass) {
93        self.prune();
94        match class {
95            ErrorClass::Transient => self.consecutive_transient += 1,
96            ErrorClass::Fatal => self.consecutive_transient = 0,
97        }
98        self.window.push_back((Instant::now(), class));
99    }
100
101    fn record_success(&mut self) {
102        self.consecutive_transient = 0;
103    }
104
105    fn has_fatal(&self) -> bool {
106        self.window.iter().any(|(_, c)| *c == ErrorClass::Fatal)
107    }
108
109    fn transient_count(&self) -> usize {
110        self.window.iter().filter(|(_, c)| *c == ErrorClass::Transient).count()
111    }
112
113    fn reset(&mut self) {
114        self.window.clear();
115        self.consecutive_transient = 0;
116    }
117}
118
119// ---- Health monitor ----
120
121/// Per-block health monitor inner state.
122struct HealthMonitorInner {
123    current_health: AgentHealth,
124    active_turn: bool,
125    last_output_ts: Instant,
126    last_meaningful_ts: Instant,
127    errors: ErrorTracker,
128    exit_code: Option<i32>,
129    last_error: Option<String>,
130}
131
132/// Per-block agent health monitor.
133///
134/// Tracks output activity and error rates, computes health state,
135/// and emits WPS events on state transitions.
136pub struct HealthMonitor {
137    block_id: String,
138    inner: Mutex<HealthMonitorInner>,
139    broker: Option<Arc<wps::Broker>>,
140}
141
142impl HealthMonitor {
143    /// Stall threshold: no meaningful output for 30s during active turn.
144    const STALL_SECS: u64 = 30;
145    /// Dead threshold: no meaningful output for 120s during active turn.
146    const DEAD_SECS: u64 = 120;
147    /// Error window duration.
148    const ERROR_WINDOW_SECS: u64 = 300; // 5 minutes
149    /// Transient error count threshold for degraded.
150    const DEGRADED_TRANSIENT_THRESHOLD: usize = 5;
151
152    pub fn new(block_id: String, broker: Option<Arc<wps::Broker>>) -> Self {
153        let now = Instant::now();
154        Self {
155            block_id,
156            inner: Mutex::new(HealthMonitorInner {
157                current_health: AgentHealth::Idle,
158                active_turn: false,
159                last_output_ts: now,
160                last_meaningful_ts: now,
161                errors: ErrorTracker::new(Duration::from_secs(Self::ERROR_WINDOW_SECS)),
162                exit_code: None,
163                last_error: None,
164            }),
165            broker,
166        }
167    }
168
169    /// Called when a new turn starts (subprocess spawned).
170    pub fn set_active_turn(&self, active: bool) {
171        let mut inner = self.inner.lock().unwrap();
172        inner.active_turn = active;
173        let now = Instant::now();
174        inner.last_output_ts = now;
175        inner.last_meaningful_ts = now;
176        if active {
177            inner.errors.reset();
178            inner.exit_code = None;
179        }
180        drop(inner);
181        self.evaluate_and_transition();
182    }
183
184    /// Called when the subprocess exits.
185    pub fn set_exited(&self, exit_code: i32) {
186        let mut inner = self.inner.lock().unwrap();
187        inner.active_turn = false;
188        inner.exit_code = Some(exit_code);
189        drop(inner);
190        self.evaluate_and_transition();
191    }
192
193    /// Called for each output line from stdout.
194    /// `meaningful` is false for rate_limit_event and similar non-progress events.
195    pub fn record_output(&self, meaningful: bool) {
196        let mut inner = self.inner.lock().unwrap();
197        let now = Instant::now();
198        inner.last_output_ts = now;
199        if meaningful {
200            inner.last_meaningful_ts = now;
201            inner.errors.record_success();
202        }
203        drop(inner);
204        // Don't evaluate on every output line — the watchdog handles periodic checks.
205        // Only re-evaluate if we were previously stalled/dead (recovery path).
206        let health = self.inner.lock().unwrap().current_health.clone();
207        if health == AgentHealth::Stalled || health == AgentHealth::Dead {
208            self.evaluate_and_transition();
209        }
210    }
211
212    /// Called when an error is detected in the output stream.
213    pub fn record_error(&self, class: ErrorClass, message: String) {
214        let mut inner = self.inner.lock().unwrap();
215        inner.errors.record(class);
216        inner.last_error = Some(message);
217        drop(inner);
218        self.evaluate_and_transition();
219    }
220
221    /// Whether there's an active turn in progress.
222    pub fn is_active_turn(&self) -> bool {
223        self.inner.lock().unwrap().active_turn
224    }
225
226    /// Periodic health check — call this every ~5 seconds while a turn is active.
227    pub fn check(&self) {
228        self.evaluate_and_transition();
229    }
230
231    /// Compute current health and emit event if it changed.
232    fn evaluate_and_transition(&self) {
233        let mut inner = self.inner.lock().unwrap();
234        let new_health = Self::compute_health(&inner);
235
236        if new_health != inner.current_health {
237            let old = inner.current_health.clone();
238            inner.current_health = new_health.clone();
239            let detail = Self::make_detail(&inner, &new_health);
240            let event = AgentHealthEvent {
241                blockid: self.block_id.clone(),
242                health: new_health.as_str().to_string(),
243                exit_code: inner.exit_code,
244                detail,
245                last_error: inner.last_error.clone(),
246            };
247            drop(inner);
248
249            tracing::info!(
250                block_id = %self.block_id,
251                old = ?old,
252                new = ?new_health,
253                "agent health transition"
254            );
255            self.publish_health(event);
256        }
257    }
258
259    /// Composite health computation.
260    fn compute_health(inner: &HealthMonitorInner) -> AgentHealth {
261        // Process exited?
262        if let Some(code) = inner.exit_code {
263            if code == 0 {
264                return AgentHealth::Idle; // Normal turn completion
265            }
266            return AgentHealth::Exited;
267        }
268
269        // Fatal error?
270        if inner.errors.has_fatal() {
271            return AgentHealth::Dead;
272        }
273
274        // Not in an active turn?
275        if !inner.active_turn {
276            return AgentHealth::Idle;
277        }
278
279        // Check output silence
280        let silence = inner.last_meaningful_ts.elapsed();
281        if silence > Duration::from_secs(Self::DEAD_SECS) {
282            return AgentHealth::Dead;
283        }
284        if silence > Duration::from_secs(Self::STALL_SECS) {
285            return AgentHealth::Stalled;
286        }
287
288        // Check transient error rate
289        if inner.errors.transient_count() >= Self::DEGRADED_TRANSIENT_THRESHOLD {
290            return AgentHealth::Degraded;
291        }
292
293        AgentHealth::Healthy
294    }
295
296    /// Generate human-readable detail string.
297    fn make_detail(inner: &HealthMonitorInner, health: &AgentHealth) -> String {
298        match health {
299            AgentHealth::Healthy => "Agent is responding normally".to_string(),
300            AgentHealth::Idle => "Waiting for next message".to_string(),
301            AgentHealth::Degraded => {
302                format!(
303                    "{} transient errors in the last 5 minutes",
304                    inner.errors.transient_count()
305                )
306            }
307            AgentHealth::Stalled => {
308                let secs = inner.last_meaningful_ts.elapsed().as_secs();
309                format!("No output for {}s", secs)
310            }
311            AgentHealth::Dead => {
312                if inner.errors.has_fatal() {
313                    inner
314                        .last_error
315                        .clone()
316                        .unwrap_or_else(|| "Fatal error detected".to_string())
317                } else {
318                    let secs = inner.last_meaningful_ts.elapsed().as_secs();
319                    format!("Unresponsive for {}s", secs)
320                }
321            }
322            AgentHealth::Exited => {
323                format!("Exited with code {}", inner.exit_code.unwrap_or(-1))
324            }
325        }
326    }
327
328    /// Publish health event via WPS broker.
329    fn publish_health(&self, event: AgentHealthEvent) {
330        if let Some(ref broker) = self.broker {
331            let wps_event = wps::WaveEvent {
332                event: wps::EVENT_AGENT_HEALTH.to_string(),
333                scopes: vec![format!("block:{}", self.block_id)],
334                sender: String::new(),
335                persist: 0,
336                data: serde_json::to_value(&event).ok(),
337            };
338            broker.publish(wps_event);
339        }
340    }
341}
342
343// ---- Error classifier for NDJSON lines ----
344
345/// Classify a parsed NDJSON line for health monitoring.
346/// Returns (is_meaningful, optional_error).
347pub fn classify_output_line(
348    parsed: &serde_json::Value,
349) -> (bool, Option<(ErrorClass, String)>) {
350    let event_type = parsed.get("type").and_then(|v| v.as_str()).unwrap_or("");
351
352    match event_type {
353        "rate_limit_event" => {
354            (false, Some((ErrorClass::Transient, "Rate limited".to_string())))
355        }
356        "result" => {
357            let is_error = parsed
358                .get("is_error")
359                .and_then(|v| v.as_bool())
360                .unwrap_or(false);
361            if !is_error {
362                return (true, None);
363            }
364            let msg = parsed
365                .get("error")
366                .or_else(|| parsed.get("error_message"))
367                .and_then(|v| v.as_str())
368                .unwrap_or("")
369                .to_lowercase();
370
371            let class = if msg.contains("unauthorized")
372                || msg.contains("401")
373                || msg.contains("forbidden")
374                || msg.contains("403")
375                || msg.contains("token expired")
376                || msg.contains("authentication")
377            {
378                ErrorClass::Fatal
379            } else if msg.contains("overloaded")
380                || msg.contains("503")
381                || msg.contains("500")
382                || msg.contains("rate")
383                || msg.contains("capacity")
384            {
385                ErrorClass::Transient
386            } else {
387                // Unknown errors default to fatal (design principle: safer to over-alert)
388                ErrorClass::Fatal
389            };
390
391            (true, Some((class, msg)))
392        }
393        // stream_event wrapper — check inner event
394        "stream_event" => {
395            if let Some(inner) = parsed.get("event") {
396                return classify_output_line(inner);
397            }
398            (true, None)
399        }
400        _ => (true, None),
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407
408    #[test]
409    fn test_error_tracker_basic() {
410        let mut tracker = ErrorTracker::new(Duration::from_secs(300));
411        assert!(!tracker.has_fatal());
412        assert_eq!(tracker.transient_count(), 0);
413
414        tracker.record(ErrorClass::Transient);
415        assert_eq!(tracker.transient_count(), 1);
416        assert!(!tracker.has_fatal());
417
418        tracker.record(ErrorClass::Fatal);
419        assert!(tracker.has_fatal());
420    }
421
422    #[test]
423    fn test_classify_rate_limit() {
424        let event: serde_json::Value =
425            serde_json::from_str(r#"{"type":"rate_limit_event"}"#).unwrap();
426        let (meaningful, error) = classify_output_line(&event);
427        assert!(!meaningful);
428        assert!(matches!(error, Some((ErrorClass::Transient, _))));
429    }
430
431    #[test]
432    fn test_classify_auth_error() {
433        let event: serde_json::Value = serde_json::from_str(
434            r#"{"type":"result","is_error":true,"error":"Unauthorized: token expired"}"#,
435        )
436        .unwrap();
437        let (_, error) = classify_output_line(&event);
438        assert!(matches!(error, Some((ErrorClass::Fatal, _))));
439    }
440
441    #[test]
442    fn test_classify_overloaded() {
443        let event: serde_json::Value = serde_json::from_str(
444            r#"{"type":"result","is_error":true,"error":"Service overloaded, try again"}"#,
445        )
446        .unwrap();
447        let (_, error) = classify_output_line(&event);
448        assert!(matches!(error, Some((ErrorClass::Transient, _))));
449    }
450
451    #[test]
452    fn test_classify_normal_result() {
453        let event: serde_json::Value = serde_json::from_str(
454            r#"{"type":"result","is_error":false,"total_cost_usd":0.05}"#,
455        )
456        .unwrap();
457        let (meaningful, error) = classify_output_line(&event);
458        assert!(meaningful);
459        assert!(error.is_none());
460    }
461
462    #[test]
463    fn test_health_monitor_lifecycle() {
464        let monitor = HealthMonitor::new("test-block".to_string(), None);
465
466        // Initial state is idle
467        {
468            let inner = monitor.inner.lock().unwrap();
469            assert_eq!(inner.current_health, AgentHealth::Idle);
470        }
471
472        // Start a turn
473        monitor.set_active_turn(true);
474        {
475            let inner = monitor.inner.lock().unwrap();
476            assert_eq!(inner.current_health, AgentHealth::Healthy);
477        }
478
479        // Record normal output
480        monitor.record_output(true);
481        {
482            let inner = monitor.inner.lock().unwrap();
483            assert_eq!(inner.current_health, AgentHealth::Healthy);
484        }
485
486        // Exit normally
487        monitor.set_exited(0);
488        {
489            let inner = monitor.inner.lock().unwrap();
490            assert_eq!(inner.current_health, AgentHealth::Idle);
491        }
492    }
493
494    #[test]
495    fn test_health_monitor_fatal_error() {
496        let monitor = HealthMonitor::new("test-block".to_string(), None);
497        monitor.set_active_turn(true);
498
499        monitor.record_error(ErrorClass::Fatal, "Unauthorized".to_string());
500        {
501            let inner = monitor.inner.lock().unwrap();
502            assert_eq!(inner.current_health, AgentHealth::Dead);
503        }
504    }
505}